package com.huan.source

import com.huan.table.SensorReading
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

import scala.util.Random

object The_custom_Source {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //自定义Source
    val inputStream = env.addSource(new MySensorSource())

    inputStream.print()

    env.execute("The_custom_Source")


  }
}

class MySensorSource() extends SourceFunction[SensorReading] {

  //定义一个标示位flag,用来表示数据源是否正常发出数据
  var isRunning : Boolean = true

  override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
    //定义一个随机数发生器
    val random = new Random()

    var curTemp = 1.to(10).map(i => ("sensor_" + i, random.nextDouble() * 100))


    //定义一个无限循环,使其不断地产生数据,除非被cancel
    while (isRunning){
      //在上次数据基础微调,更新温度值
      curTemp = curTemp.map(data =>(
        data._1,data._2+random.nextGaussian()
      ))
      val curTime = System.currentTimeMillis()

      curTemp.foreach(
        data => ctx.collect(SensorReading(data._1,curTime,data._2))
      )
      //建一个线程时间延迟
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    isRunning = false;
  }
}
